package com.cloudwise.isop.drools.listener;

import com.cloudwise.isop.drools.pojo.Metric;
import com.cloudwise.isop.drools.pojo.SourceDataDTO;
import com.cloudwise.isop.drools.utils.JacksonUtils;
import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageListener;
import org.apache.pulsar.client.api.PulsarClientException;
import org.kie.api.runtime.KieSession;
import org.kie.api.runtime.rule.FactHandle;

import java.util.List;

@Slf4j
public class ComsumerListener implements MessageListener<byte[]> {
    private KieSession kieSession;

    public ComsumerListener(KieSession kieSession1){
        this.kieSession = kieSession1;
    }

    @Override
    public void received(Consumer<byte[]> consumer, Message<byte[]> message) {
        try {
            String topic = message.getTopicName();
            String[] strs = topic.split("\\.");
            log.error(strs[1] + "   " + strs[2]);

            Metric metric = new Metric();
            String value = new String(message.getData());
            log.error("数据： " + value);
            List<SourceDataDTO> list = JacksonUtils.json2list(value, SourceDataDTO.class);
            list.forEach(item->{
                item.setDataCriterionNameEn(strs[1]);
                item.setCriterionPointNameEn(strs[2]);
                FactHandle factHandle = kieSession.insert(item);
                kieSession.fireAllRules();
                kieSession.delete(factHandle);
            });

            consumer.acknowledge(message);
        } catch (PulsarClientException e) {
            System.out.println(e.getMessage());
        }
    }
}
